//package com.zhisheng.connectors.es6.utils;
//
//import org.apache.flink.api.java.utils.ParameterTool;
//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
//import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
//import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
//import org.apache.http.HttpHost;
//
//import java.net.MalformedURLException;
//import java.net.URL;
//import java.util.ArrayList;
//import java.util.List;
//
///**
// * Desc: ES Sink utils（get ES host、addSink）//todo: index template & x-pack
// * Created by zhisheng on 2019/10/21 下午3:05
// * blog：http://www.54tianzhisheng.cn/
// * 微信公众号：zhisheng
// */
//public class ESSinkUtil {
//    //es security constant
//    public static final String ES_SECURITY_ENABLE = "es.security.enable";
//    public static final String ES_SECURITY_USERNAME = "es.security.username";
//    public static final String ES_SECURITY_PASSWORD = "es.security.password";
//
//    /**
//     * es sink
//     *
//     * @param hosts               es hosts
//     * @param bulkFlushMaxActions bulk flush size
//     * @param parallelism         并行数
//     * @param data                数据
//     * @param func
//     * @param <T>
//     */
//    public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
//                                   SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func,
//                                   ParameterTool parameterTool) {
//        ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
//        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
//        esSinkBuilder.setFailureHandler(new RetryRequestFailureHandler());
//        //todo:xpack security
//        data.addSink(esSinkBuilder.build()).setParallelism(parallelism);
//    }
//
//    /**
//     * 解析配置文件的 es hosts
//     *
//     * @param hosts
//     * @return
//     * @throws MalformedURLException
//     */
//    public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
//        String[] hostList = hosts.split(",");
//        List<HttpHost> addresses = new ArrayList<>();
//        for (String host : hostList) {
//            if (host.startsWith("http")) {
//                URL url = new URL(host);
//                addresses.add(new HttpHost(url.getHost(), url.getPort()));
//            } else {
//                String[] parts = host.split(":", 2);
//                if (parts.length > 1) {
//                    addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
//                } else {
//                    throw new MalformedURLException("invalid elasticsearch hosts format");
//                }
//            }
//        }
//        return addresses;
//    }
//}
